package com.example.hotitemanalysis;

import com.example.bean.WaterC;
import com.example.bean.WaterResult;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * 测试数据：
 * 000001,1461756862000
 * 000001,1461756866000
 * 000001,1461756872000
 * 000001,1461756873000
 * 000001,1461756874000
 */
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从调用时刻开始给env创建的每一个stream追加时间特征
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //TableEnvironment tabEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 9090);//env.readTextFile(WatermarkTest.class.getResource("/WatermarkTest.csv").getPath());
        SingleOutputStreamOperator<WaterC> outputStreamOperator = inputStream.map(new MapFunction<String, WaterC>() {
            @Override
            public WaterC map(String line) throws Exception {
                String[] lines = line.split("\\W+");
                return new WaterC(lines[0], Long.valueOf(lines[1]));
            }
        });
        SingleOutputStreamOperator<WaterC> soso = outputStreamOperator.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WaterC>() {
            Long currentMaxTimestamp = 0L;
            //最大允许的乱序时间是10s
            Long maxOutOfOrderness = 10000L;
            Watermark watermark = null;
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                //水位线。当前的时间的前N秒的数据认为达到了。
                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                //System.out.println("getCurrentWatermark="+watermark.toString());
                return watermark;
            }

            @Override
            public long extractTimestamp(WaterC element, long l) {
                //System.out.println(element+", "+l);
                Long timestamp = element.time;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("       code=" + element.code + ",eventTime=" + element.time + "|format=" + format.format(element.time) + ",currentMaxTimestamp=" + currentMaxTimestamp + "|format=" + format.format(currentMaxTimestamp) + ", watermark=" + watermark.toString());
                return timestamp;
            }
        });

        soso.keyBy("code")
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .apply(new WindowFunction<WaterC, WaterResult, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<WaterC> iterable, Collector<WaterResult> collector) throws Exception {
                        List<WaterC> list = Lists.newArrayList(iterable.iterator());
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        WaterResult waterRs = new WaterResult(tuple.toString(), list.size(), format.format(list.get(0).time), format.format(list.get(list.size()-1).time), format.format(timeWindow.getStart()), format.format(timeWindow.getEnd()));
                        collector.collect(waterRs);
                    }
                }).print("apply");
        env.execute("job TimeAndWindowAnalysis");
    }

}
